- It reduce the communication cost which speeds up the query performance while executing lookup or join operations.Spark actions are executed through a set of stages, separated by distributed “shuffle” operations.
- Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted in this way is cached in serialized form and de-serialized before running each task.
- Broadcast variables are mostly used when the tasks across multiple stages require the same data or when caching the data in the de-serialized form is required.
- spark.sql.autoBroadcastJoinThreshold by default it is 10mb
Imagine that while doing a transformation we need to lookup a large table of zip codes/pin codes. Here, it is neither feasible to send the large lookup table every time to the executors, nor can we query the database every time. The solution should be to convert this lookup table to a broadcast variables and Spark will cache it in every executor for future reference.
val list_of_pincodes: List[Int] = List(14, 19, 18, 18, 12, 17, 11, 19, 17, 19, 19)
val RDDpincodes = sc.parallelize(list_of_pincodes)
val rdd_of_cities = RDDpincodes.map(pincode => pincode_map(pincode))
print(rdd_of_cities.collect().mkString("\n"))
A code with the broadcast variables look like this:
val list_of_pincodes = List(14, 19, 18, 18, 12, 17, 11, 19, 17, 19, 19)
val RDDpincodes = sc.parallelize(list_of_pincodes)
val pincode_map_broadcast = sc.broadcast(pincode_map)
// Access value from broadcast variable
val rdd_of_cities = RDDpincodes.map(pincode => pincode_map_broadcast.value.get(pincode).get)
print(rdd_of_cities.collect().mkString("\n"))
broadcastVar.value
- The value function check variable broadcasted or not
- Once the variable is broadcasted its give the access to the local copy of the variable defined with the executor and will execute as part of task execution.
- By default the broadcast variable is cache in the machine
When we run a spark job containing Broadcast variables, spark does the following processes:
- It breaks the job into stages that have distributed shuffling. Spark executes the Actions within the stage.
- Later Stages are also broken into tasks
- Spark broadcasts the common data (reusable) needed by tasks within each stage.
- The broadcasted data is cached in serialized format and deserialized before executing each task.
- broadcastVar.unpersist
- broadcastVar.destroy
val added = input.map( x => x + localVal)
added.foreach(println)
val multiplied = input.map( x => x * 2)
multiplied.foreach(println)
val broadcastVar = sc.broadcast(2)
val added = input.map(x => broadcastVar.value + x)
added.foreach(println)
Use case 2
val emp=sc.parallelize(Seq("Jhon",1),("Martin",2),("Nancy",3),("Bob",2),("Sony",4),("Smith",3))
val dp= Map(1->'a',2->'b',3->'c')
val dep=sc.broadcast(dp)
emp.map( x=> x._1+ ","+x._2+ "," + dep.value.get(x._2).get).collect().foreach(println)
Accumulator
Another set of Shared Variables in Distributed Computing is called Accumulators. While Broadcast Variables are read-only, Spark Accumulators are read-write values that implement shared variables that can be operated on (added to), from various tasks running as a part of the job.Accumulators can help with a number of potential use cases including for example counting the number of failed records across a complete cluster, the total number of records associated with a product ID, or the number of basket check-outs in a window.
Spark’s out of the box concept of Accumulators allow multiple workers
to write to a shared variable, but does not allow them to read it. Only
the Driver node can read the accumulator’s value using .value
with the name of the accumulator.
To initialise a numeric accumulator with an initial value, use the following code:
// Initialise an accumulator with name "Failed records accumulator"
val failed_records_acc = sc.longAccumulator("Failed records accumulator")
Accumulators are “added” to through an associative and commutative operation only and thus, they can be efficiently supported in parallel. So, to increment the value of an accumulator, use the following code:
....
records.map{
//update the value of the accumulator
if(map.failed) failed_records_acc += 1
}
.....
Spark not only support numeric accumulator, but also allow the programmers to create their own types by subclassing AccumulatorV2 abstract class and implementing its various methods.
/**
* A custom accumulator for string concatenation
*/
class StringAccumulator(private var _value: String) extends AccumulatorV2[String, String] {
def this() {
this("")
}
//Accumulates the input to the current value
override def add(newValue: String): Unit = {
_value = value + " " + newValue.trim
}
//Merge another same type accumulator into the current one
override def merge(other: AccumulatorV2[String, String]): Unit = {
add(other.value)
}
//Resets the accumulator
override def reset(): Unit = {
_value = ""
}
}
When using accumulators there are some caveats that we as programmers need to be aware of,
- Spark evaluates Computations inside transformations lazily. As a result, unless some action happens, spark does not execute accumulators inside functions like map() or filter().
- Spark guarantees to update accumulators inside actions only once. So, restarting a task and recomputing the lineage updates the accumulators only once.
- Spark does not guarantee this for transformations. So, restarting a task and recomputing the lineage might update the accumulator value more than once.
A good rule of thumb to follow is to use accumulators only for data you would consider to be a side effect of your main data processing application.
No comments:
Post a Comment